/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.namesrv.processor;

import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.common.MQVersion;
import com.alibaba.rocketmq.common.MQVersion.Version;
import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
import com.alibaba.rocketmq.common.protocol.RequestCode;
import com.alibaba.rocketmq.common.protocol.ResponseCode;
import com.alibaba.rocketmq.common.protocol.body.RegisterBrokerBody;
import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import com.alibaba.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.DeleteTopicInNamesrvRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
import com.alibaba.rocketmq.namesrv.NamesrvController;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;

/**
 * Name Server网络请求处理
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-5
 */
public class DefaultRequestProcessor implements NettyRequestProcessor {
	private static final Logger log = LoggerFactory.getLogger(LoggerName.NamesrvLoggerName);

	private final NamesrvController namesrvController;

	public DefaultRequestProcessor(NamesrvController namesrvController) {
		this.namesrvController = namesrvController;
	}

	@Override
	public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		if (log.isDebugEnabled()) {
			log.debug("receive request, {} {} {}", //
					request.getCode(), //
					RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
					request);
		}

		switch (request.getCode()) {
			case RequestCode.PUT_KV_CONFIG : // 向Namesrv追加KV配置
				return this.putKVConfig(ctx, request);
			case RequestCode.GET_KV_CONFIG : // 从Namesrv获取KV配置
				return this.getKVConfig(ctx, request);
			case RequestCode.DELETE_KV_CONFIG :// 从Namesrv删除KV配置
				return this.deleteKVConfig(ctx, request);
			case RequestCode.REGISTER_BROKER : // 注册一个Broker，数据都是持久化的，如果存在则覆盖配置
				Version brokerVersion = MQVersion.value2Version(request.getVersion());
				// 新版本Broker，支持Filter Server
				if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
					return this.registerBrokerWithFilterServer(ctx, request);
				}
				// 低版本Broker，不支持Filter Server
				else {
					return this.registerBroker(ctx, request);
				}
			case RequestCode.UNREGISTER_BROKER : // 卸载一个Broker，数据都是持久化的
				return this.unregisterBroker(ctx, request);
			case RequestCode.GET_ROUTEINTO_BY_TOPIC : // 根据Topic获取Broker Name、topic配置信息
				return this.getRouteInfoByTopic(ctx, request);
			case RequestCode.GET_BROKER_CLUSTER_INFO : // 获取注册到Name Server的所有Broker集群信息
				return this.getBrokerClusterInfo(ctx, request);
			case RequestCode.WIPE_WRITE_PERM_OF_BROKER : // 去掉BrokerName的写权限
				return this.wipeWritePermOfBroker(ctx, request);
			case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER : // 从Name Server获取完整Topic列表
				return getAllTopicListFromNameserver(ctx, request);
			case RequestCode.DELETE_TOPIC_IN_NAMESRV : // 从Namesrv删除Topic配置
				return deleteTopicInNamesrv(ctx, request);
			case RequestCode.GET_KV_CONFIG_BY_VALUE : // 通过 project 获取所有的 server ip 信息
				return getKVConfigByValue(ctx, request);
			case RequestCode.DELETE_KV_CONFIG_BY_VALUE : // 删除指定 project group 下的所有 server ip 信息
				return deleteKVConfigByValue(ctx, request);
			case RequestCode.GET_KVLIST_BY_NAMESPACE : // 通过NameSpace获取所有的KV List
				return this.getKVListByNamespace(ctx, request);
			case RequestCode.GET_TOPICS_BY_CLUSTER : // 获取指定集群下的所有 topic
				return this.getTopicsByCluster(ctx, request);
			case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS : // 获取所有系统内置 Topic 列表
				return this.getSystemTopicListFromNs(ctx, request);
			case RequestCode.GET_UNIT_TOPIC_LIST : // 单元化相关 topic
				return this.getUnitTopicList(ctx, request);
			case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST : // 获取含有单元化订阅组的 Topic 列表
				return this.getHasUnitSubTopicList(ctx, request);
			case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST : // 获取含有单元化订阅组的非单元化 Topic 列表
				return this.getHasUnitSubUnUnitTopicList(ctx, request);
			default :
				break;
		}
		return null;
	}

	public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
		final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
		final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

		RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

		if (request.getBody() != null) {
			registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
		} else {
			registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
			registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
		}
		// 注册完成后返回给Broker端主用Broker的地址，以及该主用Broker的HA服务地址
		RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//
				requestHeader.getClusterName(), // 1
				requestHeader.getBrokerAddr(), // 2
				requestHeader.getBrokerName(), // 3
				requestHeader.getBrokerId(), // 4
				requestHeader.getHaServerAddr(), // 5
				registerBrokerBody.getTopicConfigSerializeWrapper(), // 6
				registerBrokerBody.getFilterServerList(), //
				ctx.channel()// 7
		);

		responseHeader.setHaServerAddr(result.getHaServerAddr());
		responseHeader.setMasterAddr(result.getMasterAddr());

		// 获取顺序消息 topic 列表
		byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
		response.setBody(jsonValue);

		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	/**
	 * 获取一个Namespace下的所有kv
	 */
	private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final GetKVListByNamespaceRequestHeader requestHeader = (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);

		byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(//
				requestHeader.getNamespace());
		if (null != jsonValue) {
			response.setBody(jsonValue);
			response.setCode(ResponseCode.SUCCESS);
			response.setRemark(null);
			return response;
		}

		response.setCode(ResponseCode.QUERY_NOT_FOUND);
		response.setRemark("No config item, Namespace: " + requestHeader.getNamespace());
		return response;
	}

	private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final DeleteTopicInNamesrvRequestHeader requestHeader = (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);

		this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());

		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	/**
	 * 获取全部Topic列表
	 * 
	 * @param ctx
	 * @param request
	 * @return
	 */
	private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);

		byte[] body = this.namesrvController.getRouteInfoManager().getAllTopicList();

		response.setBody(body);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class);
		final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader();
		final WipeWritePermOfBrokerRequestHeader requestHeader = (WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class);

		int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());

		log.info("wipe write perm of broker[{}], client: {}, {}", //
				requestHeader.getBrokerName(), //
				RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
				wipeTopicCnt);

		responseHeader.setWipeTopicCount(wipeTopicCnt);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);

		byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();
		response.setBody(content);

		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

		TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

		if (topicRouteData != null) {
			// broker的顺序---参数格式为：以";"解析成数组，数组的每个成员是以":"分隔的，构成数据"brokerName:queueNum"；
			String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic());
			topicRouteData.setOrderTopicConf(orderTopicConf);

			byte[] content = topicRouteData.encode();
			response.setBody(content);
			response.setCode(ResponseCode.SUCCESS);
			response.setRemark(null);
			return response;
		}

		response.setCode(ResponseCode.TOPIC_NOT_EXIST);
		response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
		return response;
	}

	public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final PutKVConfigRequestHeader requestHeader = (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);

		this.namesrvController.getKvConfigManager().putKVConfig(//
				requestHeader.getNamespace(), //
				requestHeader.getKey(), //
				requestHeader.getValue()//
		);

		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
		final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader();
		final GetKVConfigRequestHeader requestHeader = (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);

		String value = this.namesrvController.getKvConfigManager().getKVConfig(//
				requestHeader.getNamespace(), //
				requestHeader.getKey()//
		);

		if (value != null) {
			responseHeader.setValue(value);
			response.setCode(ResponseCode.SUCCESS);
			response.setRemark(null);
			return response;
		}

		response.setCode(ResponseCode.QUERY_NOT_FOUND);
		response.setRemark("No config item, Namespace: " + requestHeader.getNamespace() + " Key: " + requestHeader.getKey());
		return response;
	}

	public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final DeleteKVConfigRequestHeader requestHeader = (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);

		this.namesrvController.getKvConfigManager().deleteKVConfig(//
				requestHeader.getNamespace(), //
				requestHeader.getKey()//
		);

		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
		final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
		final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

		TopicConfigSerializeWrapper topicConfigWrapper = null;
		if (request.getBody() != null) {
			topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
		} else {
			topicConfigWrapper = new TopicConfigSerializeWrapper();
			topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
			topicConfigWrapper.getDataVersion().setTimestatmp(0);
		}

		RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//
				requestHeader.getClusterName(), // 1
				requestHeader.getBrokerAddr(), // 2
				requestHeader.getBrokerName(), // 3
				requestHeader.getBrokerId(), // 4
				requestHeader.getHaServerAddr(), // 5
				topicConfigWrapper, // 6
				null, //
				ctx.channel()// 7
		);

		responseHeader.setHaServerAddr(result.getHaServerAddr());
		responseHeader.setMasterAddr(result.getMasterAddr());

		// 获取顺序消息 topic 列表
		byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
		response.setBody(jsonValue);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final UnRegisterBrokerRequestHeader requestHeader = (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);

		this.namesrvController.getRouteInfoManager().unregisterBroker(//
				requestHeader.getClusterName(), // 1
				requestHeader.getBrokerAddr(), // 2
				requestHeader.getBrokerName(), // 3
				requestHeader.getBrokerId());

		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	public RemotingCommand getKVConfigByValue(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
		final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader();
		final GetKVConfigRequestHeader requestHeader = (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);

		String value = this.namesrvController.getKvConfigManager().getKVConfigByValue(//
				requestHeader.getNamespace(), //
				requestHeader.getKey()//
		);

		if (value != null) {
			responseHeader.setValue(value);
			response.setCode(ResponseCode.SUCCESS);
			response.setRemark(null);
			return response;
		}

		response.setCode(ResponseCode.QUERY_NOT_FOUND);
		response.setRemark("No config item, Namespace: " + requestHeader.getNamespace() + " Key: " + requestHeader.getKey());
		return response;
	}

	public RemotingCommand deleteKVConfigByValue(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final DeleteKVConfigRequestHeader requestHeader = (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);

		this.namesrvController.getKvConfigManager().deleteKVConfigByValue(//
				requestHeader.getNamespace(), //
				requestHeader.getKey()//
		);

		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	/**
	 * 获取指定集群下的全部Topic列表
	 * 
	 * @param ctx
	 * @param request
	 * @return
	 */
	private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);
		final GetTopicsByClusterRequestHeader requestHeader = (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);

		byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());

		response.setBody(body);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	/**
	 * 获取所有系统内置 Topic 列表
	 * 
	 * @param ctx
	 * @param request
	 * @return
	 * @throws RemotingCommandException
	 */
	private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);

		byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList();

		response.setBody(body);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	/**
	 * 获取单元化逻辑 Topic 列表
	 * 
	 * @param ctx
	 * @param request
	 * @return
	 * @throws RemotingCommandException
	 */
	private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);

		byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics();

		response.setBody(body);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	/**
	 * 获取含有单元化订阅组的 Topic 列表
	 * 
	 * @param ctx
	 * @param request
	 * @return
	 * @throws RemotingCommandException
	 */
	private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);

		byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();

		response.setBody(body);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}

	/**
	 * 获取含有单元化订阅组的非单元化 Topic 列表
	 * 
	 * @param ctx
	 * @param request
	 * @return
	 * @throws RemotingCommandException
	 */
	private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
		final RemotingCommand response = RemotingCommand.createResponseCommand(null);

		byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();

		response.setBody(body);
		response.setCode(ResponseCode.SUCCESS);
		response.setRemark(null);
		return response;
	}
}
